package cn.doitedu.rtmk.demo1;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class RuleEngineCoreFunction1 extends KeyedProcessFunction<Long, UserEvent, JSONObject> {

    @Override
    public void processElement(UserEvent event, KeyedProcessFunction<Long, UserEvent, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
        // 判断该事件 是否满足  规则的触发条件： event_id = ad_click 且  properties['ad_id']='ad001'
        if (event.getEvent_id().equals("ad_click") && event.getProperties().get("ad_id").equals("ad001")) {
            JSONObject resultObj = new JSONObject();
            resultObj.put("user_id", event.getUser_id());
            resultObj.put("hit_time", event.getEvent_time());
            resultObj.put("hit_rule", "rule-001");
            out.collect(resultObj);
        }
    }


}
